package gu.simplemq.mqtt;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.Assert;
import org.junit.Test;

import com.google.common.collect.ImmutableMap;

import gu.simplemq.Channel;
import gu.simplemq.IMessageQueueFactory;
import gu.simplemq.exceptions.SmqUnsubscribeException;

/**
 * @author guyadong
 *
 */
public class MqttSubscriberTest implements MqttConstants{
	private static final String MQTT_HOST = "192.168.10.226";

    private static int qos = 2;
    private static String broker = "tcp://" + MQTT_HOST + ":" + DEFAULT_MQTT_PORT;
//    private static String userName = "tuyou";
//    private static String passWord = "tuyou";
   

    private static MqttClient connect(String clientId) throws MqttException{
    	MemoryPersistence persistence = new MemoryPersistence();
    	MqttConnectOptions connOpts = new MqttConnectOptions();
    	connOpts.setCleanSession(false);
    	//        connOpts.setUserName(userName);
    	//        connOpts.setPassword(passWord.toCharArray());
    	connOpts.setConnectionTimeout(10);
    	connOpts.setKeepAliveInterval(20);
    	//        connOpts.setServerURIs(uris);
    	//        connOpts.setWill(topic, "close".getBytes(), 2, true);
    	MqttClient mqttClient = new MqttClient(broker, clientId, persistence);
    	mqttClient.connect(connOpts);
    	return mqttClient;
    }
    
    public static void sub(MqttClient mqttClient,String topic) throws MqttException{
        int[] Qos  = {qos};
        String[] topics = {topic};
        mqttClient.subscribe(topics, Qos);
    }    
    

	private static void waitquit(){
		System.out.println("PRESS 'quit' OR 'CTRL-C' to exit");
		BufferedReader reader = new BufferedReader(new InputStreamReader(System.in)); 
		try{
			while(!"quit".equalsIgnoreCase(reader.readLine())){
			}
			System.exit(0);
		} catch (IOException e) {
	
		}finally {
	
		}
	}
	@Test
	public void test1() {
		MqttClient mqttClient;
		try {
			mqttClient = connect("testSub");
	        mqttClient.setCallback(new MyCallback());

			sub(mqttClient,"ActiveMQ.Advisory.Consumer.Topic.chat1");
			sub(mqttClient,"chat1");
			sub(mqttClient,"chat2");
			sub(mqttClient,"chat3");
			waitquit();
		} catch (MqttException e) {
			e.printStackTrace();
			Assert.assertTrue(false);
		}

	}
	@SuppressWarnings({ "rawtypes", "unchecked", "resource" })
	@Test
	public void test2() {
//		ImmutableMap<String,String> m = ImmutableMap.of(MQ_USERNAME,"user",MQ_PASSWORD,"password");
		ImmutableMap<String,String> m = ImmutableMap.of(MQ_URI, PAHO_MQTT_SCHEMA + "://user:password@" + MQTT_HOST + ":" + DEFAULT_MQTT_PORT);

		final IMessageQueueFactory factory = new MessageQueueFactoryImpl().init((Map)m);
		try{

			final Channel<String> c1 = new LogChannel("chat1");
			final Channel<String> c2 = new LogChannel("chat2");
			factory.getSubscriber().register(c1,c2);
			Runtime.getRuntime().addShutdownHook(new Thread(){

				@Override
				public void run() {
					logger.info("unregister:ch1,ch2");
					factory.getSubscriber().unregister(c1,c2);
				}

			});
			waitquit();
			factory.close();
		} catch (Throwable e) {
			e.printStackTrace();
		}
		
	}
	class MyCallback implements MqttCallback{
		@Override
		public void connectionLost(Throwable cause) {
			// 连接丢失后，一般在这里面进行重连
			System.out.println("连接断开，可以做重连");
		}
		@Override
		public void deliveryComplete(IMqttDeliveryToken token) {
			logger.info("deliveryComplete---------" + token.isComplete());
		}
		@Override
		public void messageArrived(String topic, MqttMessage message) throws Exception {
			logger.info("接收消息的主题 : " + topic);
			logger.info("接收消息的质量Qos : " + message.getQos());

			String msg = new String(message.getPayload());
			logger.info("msg:" + msg);
		}
		
	}
	class LogChannel extends Channel<String>{

		protected LogChannel(String name) {
			super(name);
		}
		@Override
		public void onSubscribe(String t) throws SmqUnsubscribeException {
			logger.info(name + " msg:" + t);				
		}
	}
}
